package com.shujia.spark.core

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo17Student2 {

  def main(args: Array[String]): Unit = {

    /**
      * 4、统计偏科最严重的前100名学生
      * 1、计算每个学生分数的方差
      * 2、对方差排序，去前100
      * 3、整理数据
      *
      */

    val conf: SparkConf = new SparkConf()
      .setAppName("student")
      .setMaster("local")

    val sc = new SparkContext(conf)

    //读取分数表
    val scoresRDD: RDD[String] = sc.textFile("data/score.txt")

    val courceRDD: RDD[String] = sc.textFile("data/course.txt")

    //由于分数的范围不一样，所有需要做归一化

    val scoreKVRDD: RDD[(String, String)] = scoresRDD.map(sco => {
      (sco.split(",")(1), sco)
    })

    val couKVRDD: RDD[(String, String)] = courceRDD.map(cou => {
      (cou.split(",")(0), cou)
    })

    //关联学生表和分数表
    val joinRDD: RDD[(String, (String, String))] = scoreKVRDD.join(couKVRDD)

    val idAndScoreRDD: RDD[(String, Double)] = joinRDD.map {
      //如果参数用不到可以通过下划线代替
      case (_: String, (sco: String, cou: String)) =>
        val scoSplit: Array[String] = sco.split(",")
        //学号
        val id: String = scoSplit(0)

        val score: Double = scoSplit(2).toDouble

        val couSumSco: Double = cou.split(",")(2).toDouble

        //对分数进行归一化
        (id, score / couSumSco)
    }


    //安装id进行分组
    val groupByRDD: RDD[(String, Iterable[Double])] = idAndScoreRDD.groupByKey()


    //计算每个学生的标准差
    val stdRDD: RDD[(String, Double)] = groupByRDD.map {
      case (id: String, ss: Iterable[Double]) =>

        /**
          * 计算学生分数的标准差
          */

        val scoList: List[Double] = ss.toList

        //计算平均数
        val avgSco: Double = scoList.sum / scoList.size

        //分数减去平均数再平方
        val chaScore: List[Double] = scoList.map(i => (i - avgSco) * (i - avgSco))

        //标准差的分子
        val fz: Double = chaScore.sum

        //计算标准差
        val std: Double = fz / scoList.size

        (id, std)
    }

    //stdRDD.sortBy(-_._2).foreach(println)

    //降序排序取前100

    val top10List: Array[(String, Double)] = stdRDD.sortBy(_._2, ascending = false).take(100)

    //偏科最严重学生的id
    val ids: Array[String] = top10List.map(_._1)
    //ids.foreach(println)


    //取出学生的分数
    val top100Score: RDD[String] = scoresRDD.filter(score => {
      val id: String = score.split(",")(0)
      ids.contains(id)
    })

    top100Score.foreach(println)

  }


}
